# @Time : 2024/6/13 14:50
# @Author : ZHUYI
# @File : consumer1_mysql
from kafka import KafkaConsumer
import json
import DB_connection


def consumer1(topic):
    print("consumer1开始消费。。。。")
    # 订阅主题，使用本地Kafka服务器，从最早的消息开始消费
    consumer = KafkaConsumer(topic, bootstrap_servers=['localhost:9092'],
                             group_id=None, auto_offset_reset='earliest')
    # 建立数据库连接
    conn, cursor = DB_connection.db_connection()
    for msg in consumer:
        msg1 = str(msg.value, encoding="utf-8")
        # 两次json.loads处理嵌套的JSON字符串
        json_dict = json.loads(json.loads(msg1))
        # 插入数据
        for d in json_dict:
            for k, v in d.items():
                # 当键为"data"时，进一步处理
                if k == "data":
                    for i in v:
                        # 将字典转换为JSON字符串再转为字典，纠正格式
                        value = json.loads(json.dumps(i))
                        # 提取相关信息
                        base = value['base_info']
                        user = value['user_info']
                        # 掘金小册的具体信息
                        id = base['booklet_id']
                        title = base['title']
                        summary = base['summary']
                        author = user['user_name']
                        job_title = user['job_title']
                        cover_img = base['cover_img']
                        price = value['max_discount']['price']
                        buy_count = base['buy_count']
                        is_finished = base['is_finished']
                        # 执行SQL语句，将数据插入到数据库中
                        cursor.execute("INSERT INTO juejin VALUES (%s, %s, %s, %s, %s, %s,%s, %s, %s)",
                                       (id, title, summary, author, job_title, cover_img, price, buy_count, is_finished))
                        conn.commit()
                        print(title + ' 成功插入数据')
        consumer.close()
    # 关闭数据库连接
    conn.close()
    print("consumer1消费结束。。。。")



